package com.uziot.bucket.common.pool;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @author shidt
 * @version V1.0
 * @className sas
 * @date 2020-12-30 22:43:30
 * @description 平台内部部分实例默认线程池配置
 */

@Slf4j
public class ThreadPoolFactory {
    /**
     * 当前对象实例
     */
    private static final ThreadPoolFactory INSTANCE = new ThreadPoolFactory();

    /**
     * 线程池对象池
     */
    private final Map<String, ThreadPoolExecutor> pools = new HashMap<>();

    /**
     * 实例化方法调用
     *
     * @return 当前工厂对象
     */
    public static ThreadPoolFactory getInstance() {
        return INSTANCE;
    }

    /**
     * 静态线程池
     */
    private static volatile ThreadPoolTaskExecutor threadPoolTaskExecutor;

    /**
     * 静态定时执行线程池
     */
    private static volatile ScheduledExecutorService scheduledExecutorService;

    /**
     * 核心线程池大小
     */
    private static final int CORE_POOL_SIZE = 10;
    /**
     * 最大可创建的线程数
     */
    private static final int MAX_POOL_SIZE = 1000;
    /**
     * 队列最大长度
     */
    private static final int QUEUE_CAPACITY = 2000;
    /**
     * 线程池维护线程所允许的空闲时间
     */
    private static final int KEEP_ALIVE_SECONDS = 30;

    /**
     * 默认静态线程池1
     *
     * @return ThreadPoolTaskExecutor
     */
    public static ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        if (null != threadPoolTaskExecutor) {
            return threadPoolTaskExecutor;
        }
        synchronized (ThreadPoolFactory.class) {
            threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            // 设置核心线程数
            threadPoolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
            // 设置最大线程数
            threadPoolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
            // 设置队列容量
            threadPoolTaskExecutor.setQueueCapacity(QUEUE_CAPACITY);
            // 设置线程活跃时间（秒）
            threadPoolTaskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
            // 线程池对拒绝任务(无线程可用)的处理策略
            // 设置拒绝策略rejection-policy：当pool已经达到max size的时候，
            // 如何处理新任务 CALLER_RUNS：不在新线程中执行任务，而是由调用者所在的线程来执行
            threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 设置默认线程名称
            threadPoolTaskExecutor.setThreadNamePrefix("ST-EXEC-");
            // 等待所有任务结束后再关闭线程池
            threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
            // 初始化线程池
            threadPoolTaskExecutor.initialize();
        }
        return threadPoolTaskExecutor;
    }

    /**
     * 默认静态执行周期性或定时任务
     */
    public static ScheduledExecutorService scheduledExecutorService() {
        if (null != scheduledExecutorService) {
            return scheduledExecutorService;
        }
        synchronized (ThreadPoolFactory.class) {
            scheduledExecutorService = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE,
                    new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d")
                            .daemon(true).build()) {
                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    super.afterExecute(r, t);
                    Threads.printException(r, t);
                }
            };
        }
        return scheduledExecutorService;
    }


    /**
     * 根据线程池配置文件加载线程池
     *
     * @param poolName      线程池名称
     * @param corePoolSize  核心池最大值
     * @param keepAliveTime 超出核心池线程空闲存活时间，单位秒
     */
    public void addThreadPool(String poolName, String corePoolSize, String keepAliveTime) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                Integer.parseInt(corePoolSize),
                Integer.MAX_VALUE,
                Long.parseLong(keepAliveTime),
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(),
                new CommonThreadFactory(poolName),
                new CommonRejectHandler(poolName));
        this.pools.put(poolName, poolExecutor);
    }

    /**
     * 列出所有线程池，用于监控
     *
     * @return 线程池列表
     */
    public Map<String, ThreadPoolExecutor> listAllPools() {
        return this.pools;
    }

    /**
     * 运行时，根据线程名称获取线程池
     *
     * @param poolName 线程池名称
     * @return 线程池
     */
    public ThreadPoolExecutor getPool(String poolName) {
        if (log.isDebugEnabled()) {
            log.debug("getPool poolName=" + poolName);
        }
        return this.pools.get(poolName);
    }

    /**
     * 通用线程工厂
     *
     * @author leijf
     */
    static class CommonThreadFactory implements ThreadFactory {
        private final String poolName;

        public CommonThreadFactory(String poolName) {
            this.poolName = poolName;
        }

        public String getPoolName() {
            return poolName;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName(poolName + "_" + thread.getName());
            return thread;
        }

    }

    /**
     * 通用拒绝线程处理类
     *
     * @author leijf
     */
    static class CommonRejectHandler implements RejectedExecutionHandler {
        private final String poolName;

        public CommonRejectHandler(String poolName) {
            this.poolName = poolName;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (log.isErrorEnabled()) {
                log.error("线程池【" + this.poolName + "】已超出处理上限，或系统资源不够用。");
            }
        }

    }

    public int getCorePoolSize() {
        return CORE_POOL_SIZE;
    }

    public int getMaxPoolSize() {
        return MAX_POOL_SIZE;
    }

    public int getQueueCapacity() {
        return QUEUE_CAPACITY;
    }

    public int getKeepAliveSeconds() {
        return KEEP_ALIVE_SECONDS;
    }

}
